/*
* Conditions Of Use 
* 
* This software was developed by employees of the National Institute of
* Standards and Technology (NIST), an agency of the Federal Government.
* Pursuant to title 15 Untied States Code Section 105, works of NIST
* employees are not subject to copyright protection in the United States
* and are considered to be in the public domain.  As a result, a formal
* license is not needed to use the software.
* 
* This software is provided by NIST as a service and is expressly
* provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
* OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
* AND DATA ACCURACY.  NIST does not warrant or make any representations
* regarding the use of the software or the results thereof, including but
* not limited to the correctness, accuracy, reliability or usefulness of
* the software.
* 
* Permission to use this software is contingent upon your acceptance
* of the terms of this agreement
*  
* .
* 
*/
/*******************************************************************************
 *   Product of NIST/ITL Advanced Networking Technologies Division (ANTD).     *
 *******************************************************************************/
package gov.nist.javax.sip.stack;

import java.io.IOException;
import java.util.LinkedList;
import java.net.*;

import gov.nist.core.*;

/**
 * Sit in a loop and handle incoming udp datagram messages. For each Datagram
 * packet, a new UDPMessageChannel is created (upto the max thread pool size).
 * Each UDP message is processed in its own thread).
 * 
 * @version 1.2 $Revision: 1.31 $ $Date: 2008/11/24 23:59:04 $
 * 
 * @author M. Ranganathan  <br/>
 * 
 * 
 * 
 * <a href="{@docRoot}/../uml/udp-request-processing-sequence-diagram.jpg">
 * See the implementation sequence diagram for processing incoming requests.
 * </a>
 * 
 * 
 * Acknowledgement: Jeff Keyser contributed ideas on starting and stoppping the
 * stack that were incorporated into this code. Niklas Uhrberg suggested that
 * thread pooling be added to limit the number of threads and improve
 * performance.
 */
public class UDPMessageProcessor extends MessageProcessor {
	
	private static final int HIGHWAT = 100 ; // High water mark for queue size.
	
	private static final int LOWAT =   50 ; // Low water mark for queue size

	/**
	 * The Mapped port (in case STUN suport is enabled)
	 */
	private int port;

	/**
	 * Incoming messages are queued here.
	 */
	protected LinkedList messageQueue;

	/**
	 * A list of message channels that we have started.
	 */
	protected LinkedList messageChannels;

	/**
	 * Max # of udp message channels
	 */
	protected int threadPoolSize;

	/**
	 * Max datagram size.
	 */
	protected static final int MAX_DATAGRAM_SIZE = 8 * 1024;

	/**
	 * Our stack (that created us).
	 */
	protected SIPTransactionStack sipStack;

	
	protected DatagramSocket sock;

	/**
	 * A flag that is set to false to exit the message processor (suggestion by
	 * Jeff Keyser).
	 */
	protected boolean isRunning;

	/**
	 * Constructor.
	 * 
	 * @param sipStack
	 *            pointer to the stack.
	 */
	protected UDPMessageProcessor(InetAddress ipAddress,
			SIPTransactionStack sipStack, int port) throws IOException {
		super(ipAddress, port, "udp");

		this.sipStack = sipStack;

		this.messageQueue = new LinkedList();

		this.port = port;
		try {
			this.sock = sipStack.getNetworkLayer().createDatagramSocket(port,
					ipAddress);
			// Create a new datagram socket.
			sock.setReceiveBufferSize(MAX_DATAGRAM_SIZE);

			/**
			 * If the thread auditor is enabled, define a socket timeout value in order to
			 * prevent sock.receive() from blocking forever
			 */
			if (sipStack.getThreadAuditor().isEnabled()) {
				sock.setSoTimeout((int) sipStack.getThreadAuditor().getPingIntervalInMillisecs());
			}
			if ( ipAddress.getHostAddress().equals(IN_ADDR_ANY)  ||
				 ipAddress.getHostAddress().equals(IN6_ADDR_ANY)){
				// Store the address to which we are actually bound
				// Note that on WINDOWS this is actually broken. It will
				// return IN_ADDR_ANY again. On linux it will return the 
				// address to which the socket was actually bound.
				super.setIpAddress( sock.getLocalAddress() );
				
			}
		} catch (SocketException ex) {
			throw new IOException(ex.getMessage());
		}
	}

	

	/**
	 * Get port on which to listen for incoming stuff.
	 * 
	 * @return port on which I am listening.
	 */
	public int getPort() {
		return this.port;
	}

	/**
	 * Start our processor thread.
	 */
	public void start() throws IOException {
		

		this.isRunning = true;
		Thread thread = new Thread(this);
		thread.setDaemon(true);
		// Issue #32 on java.net
		thread.setName("UDPMessageProcessorThread");
		// Issue #184
		thread.setPriority(Thread.MAX_PRIORITY);
		thread.start();
	}

	/**
	 * Thread main routine.
	 */
	public void run() {
		// Check for running flag.
		this.messageChannels = new LinkedList();
		// start all our messageChannels (unless the thread pool size is
		// infinity.
		if (sipStack.threadPoolSize != -1) {
			for (int i = 0; i < sipStack.threadPoolSize; i++) {
				UDPMessageChannel channel = new UDPMessageChannel(sipStack,
						this);
				this.messageChannels.add(channel);

			}
		}

		// Ask the auditor to monitor this thread
		ThreadAuditor.ThreadHandle threadHandle = sipStack.getThreadAuditor().addCurrentThread();

		// Somebody asked us to exit. if isRunnning is set to false.
		while (this.isRunning) {
			
			try {
				// Let the thread auditor know we're up and running
				threadHandle.ping();

				int bufsize = sock.getReceiveBufferSize();
				byte message[] = new byte[bufsize];
				DatagramPacket packet = new DatagramPacket(message, bufsize);
				sock.receive(packet);

			 // This is a simplistic congestion control algorithm.
			 // It accepts packets if queuesize is < LOWAT. It drops
			 // requests if the queue size exceeds a HIGHWAT and accepts
			 // requests with probability p proportional to the difference
			 // between current queue size and LOWAT in the range
			 // of queue sizes between HIGHWAT and LOWAT.
			 // TODO -- penalize spammers by looking at the source
			 // port and IP address.
			 if ( this.messageQueue.size() >= HIGHWAT) {
					if (sipStack.logWriter.isLoggingEnabled()) {
						sipStack.logWriter.logDebug("Dropping message -- queue length exceeded");
						
					}
					//System.out.println("HIGHWAT Drop!");
					continue;
				} else if ( this.messageQueue.size() > LOWAT && this .messageQueue.size() < HIGHWAT ) {
					// Drop the message with a probabilty that is linear in the range 0 to 1 
					float threshold = ((float)(messageQueue.size() - LOWAT))/ ((float)(HIGHWAT - LOWAT));
					boolean decision = Math.random() > 1.0 - threshold;
					if ( decision ) {
						if (sipStack.logWriter.isLoggingEnabled()) {
							sipStack.logWriter.logDebug("Dropping message with probability 	" + (1.0 - threshold));
							
						}
						//System.out.println("RED Drop!");
						continue;
					}
					
				} 
				// Count of # of packets in process.
				// this.useCount++;
				if (sipStack.threadPoolSize != -1) {
					// Note: the only condition watched for by threads
					// synchronizing on the messageQueue member is that it is
					// not empty. As soon as you introduce some other
					// condition you will have to call notifyAll instead of
					// notify below.

					synchronized (this.messageQueue) {
					    // was addLast
						this.messageQueue.add(packet);
						this.messageQueue.notify();
					}
				} else {
					new UDPMessageChannel(sipStack, this, packet);
				}
			} catch (SocketTimeoutException ex) {
			  // This socket timeout alows us to ping the thread auditor periodically
			} catch (SocketException ex) {
				if (sipStack.isLoggingEnabled())
					getSIPStack().logWriter
							.logDebug("UDPMessageProcessor: Stopping");
				isRunning = false;
				// The notifyAll should be in a synchronized block.
				// ( bug report by Niklas Uhrberg ).
				synchronized (this.messageQueue) {
					this.messageQueue.notifyAll();
				}
			} catch (IOException ex) {
				isRunning = false;
				ex.printStackTrace();
				if (sipStack.isLoggingEnabled())
					getSIPStack().logWriter
							.logDebug("UDPMessageProcessor: Got an IO Exception");
			} catch (Exception ex) {
				if (sipStack.isLoggingEnabled())
					getSIPStack().logWriter
							.logDebug("UDPMessageProcessor: Unexpected Exception - quitting");
				InternalErrorHandler.handleException(ex);
				return;
			}
		}
	}

	/**
	 * Shut down the message processor. Close the socket for recieving incoming
	 * messages.
	 */
	public void stop() {
		synchronized (this.messageQueue) {
			this.isRunning = false;
			this.messageQueue.notifyAll();
			sock.close();
			

		}
	}

	/**
	 * Return the transport string.
	 * 
	 * @return the transport string
	 */
	public String getTransport() {
		return "udp";
	}

	/**
	 * Returns the stack.
	 * 
	 * @return my sip stack.
	 */
	public SIPTransactionStack getSIPStack() {
		return sipStack;
	}

	/**
	 * Create and return new TCPMessageChannel for the given host/port.
	 */
	public MessageChannel createMessageChannel(HostPort targetHostPort)
			throws UnknownHostException {
		return new UDPMessageChannel(targetHostPort.getInetAddress(),
				targetHostPort.getPort(), sipStack, this);
	}

	public MessageChannel createMessageChannel(InetAddress host, int port)
			throws IOException {
		return new UDPMessageChannel(host, port, sipStack, this);
	}

	/**
	 * Default target port for UDP
	 */
	public int getDefaultTargetPort() {
		return 5060;
	}

	/**
	 * UDP is not a secure protocol.
	 */
	public boolean isSecure() {
		return false;
	}

	/**
	 * UDP can handle a message as large as the MAX_DATAGRAM_SIZE.
	 */
	public int getMaximumMessageSize() {
		return MAX_DATAGRAM_SIZE;
	}

	/**
	 * Return true if there are any messages in use.
	 */
	public boolean inUse() {
		synchronized (messageQueue) {
			return messageQueue.size() != 0;
		}
	}

}
